package com.gitee.funcy.disruptorlearn.ch03.sigle;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author fangchengyan
 * @date 2019-03-15 17:53
 */
public class Main {

    public static void main(String[] args) throws Exception {
        //构建线程池用于提交任务
        ExecutorService service = Executors.newFixedThreadPool(4);
        ExecutorService service2 = Executors.newFixedThreadPool(5);

        //1. 构建 Disruptor
        Disruptor<Trade> disruptor = new Disruptor<Trade>(
                new EventFactory<Trade>() {
                    @Override
                    public Trade newInstance() {
                        return new Trade();
                    }
                },
                1024 * 1024,
                service2,
                ProducerType.SINGLE,
                new BusySpinWaitStrategy());

        // 2. 把消费者设置到disruptor中handleEventsWith

        /*
        //2.1 串行操作
        disruptor
                .handleEventsWith(new Handler1())
                .handleEventsWith(new Handler2())
                .handleEventsWith(new Handler3());
        */

        //2.2 并行操作
//        disruptor.handleEventsWith(new Handler1());
//        disruptor.handleEventsWith(new Handler2());
//        disruptor.handleEventsWith(new Handler3());
        //--------
//        disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());

        /*
         * 2.3 菱形操作
         *        H1,H2
         *       /     \
         * start        end
         *       \    /
         *         H3
         *
         */
//        disruptor.handleEventsWith(new Handler1(), new Handler2())
//                .handleEventsWith(new Handler3());
        //---------
//        disruptor.handleEventsWith(new Handler1(), new Handler2())
//                .then(new Handler3());

        /*
         * 2.4 六边形操作
         *
         *        H1---H2
         *       /       \
         * start          H3
         *       \       /
         *        H4---H5
         *
         */
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h4);
        disruptor.after(h1).handleEventsWith(h2);
        disruptor.after(h4).handleEventsWith(h5);
        disruptor.after(h2, h5).handleEventsWith(h3);

        // 3. 启动disruptor
        RingBuffer<Trade> ringBuffer = disruptor.start();
        CountDownLatch latch = new CountDownLatch(1);

        long start = System.currentTimeMillis();

        service.submit(new TradePublisher(latch, disruptor));

        latch.await();

        disruptor.shutdown();
        service.shutdown();
        service2.shutdown();

        System.out.println("总耗时：" + (System.currentTimeMillis() - start) + " ms!");
    }

}

















